package com.bytecub.gateway.mq.consume;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.bytecub.common.biz.TopicBiz;
import com.bytecub.common.domain.gateway.mq.UpgradeMessageBo;
import com.bytecub.common.domain.message.DeviceDownMessage;
import com.bytecub.common.enums.UpgradeEnum;
import com.bytecub.common.enums.UpgradePushTypeEnum;
import com.bytecub.gateway.mq.mqttclient.BcPubMqttClient;
import com.bytecub.gateway.mq.storage.UpgradeStorage;
import com.bytecub.mdm.cache.IDeviceOfflineCache;
import com.bytecub.mdm.cache.IFirmwareCache;
import com.bytecub.mdm.service.ITaskDetailService;
import com.bytecub.protocol.base.IBaseProtocol;
import com.bytecub.protocol.domain.bo.DeviceProductBo;
import com.bytecub.protocol.service.IProtocolUtilService;
import com.bytecub.utils.IdGenerate;
import com.bytecub.utils.StringUtils;

import lombok.extern.slf4j.Slf4j;

/**
 *  * ByteCub.cn.  * Copyright (c) 2020-2020 All Rights Reserved.  *   * @author bytecub@163.com songbin
 *  * @version Id: MQTTConsumeMonitor.java, v 0.1 2020-12-09  Exp $$  
 */
@Service
@Slf4j
public class UpgradeConsume {
    @Autowired
    private BcPubMqttClient mqttClient;
    @Autowired
    private IFirmwareCache firmwareCache;
    @Autowired
    private IProtocolUtilService protocolUtilService;
    @Autowired
    private ITaskDetailService taskDetailService;
    @Autowired
    private IDeviceOfflineCache deviceOfflineCache;
    /***
     * 具体线程
     */
    @Async
    public void execute() {
        while (true) {
            try {
                UpgradeMessageBo msg = UpgradeStorage.pop();
                Boolean pubStatus = this.publish(msg);
                this.updateStatus(msg, pubStatus);
                if(!UpgradePushTypeEnum.URL.getType().equals(msg.getPushType())){
                    /**如果不是URL升级，因为推送内容过大，所以这里缓个3秒 再推送下一个*/
                    if(pubStatus){
                        /**离线设备就不下发，所以也就不需要延迟了*/
                        Thread.sleep(3000);
                    }

                }

            } catch (Exception e) {
                log.warn("内部队列消费异常", e);
            }
        }
    }
    /**推送消息到设备
     * @return  false:设备离线 不推送  true 正常推送
     * */
    private Boolean publish(UpgradeMessageBo msg){
        if(null == deviceOfflineCache.cacheReader(msg.getDeviceCode())){
            String messageId = IdGenerate.genId();
            msg.setMessageId(messageId);
            return false;
        }


        String fileBase = firmwareCache.reader(msg.getId());
        if(StringUtils.isEmpty(fileBase)){
            fileBase = firmwareCache.writer(msg.getId(), msg.getUrl());
        }
        msg.setFileBase64(fileBase);
        msg.setUrl(msg.getUrl());
        DeviceProductBo deviceProductBo = protocolUtilService.queryInstanceByDeviceCode(msg.getDeviceCode());
        IBaseProtocol baseProtocol = deviceProductBo.getBaseProtocol();
        String topic = TopicBiz.buildUpgradeSet(msg.getDeviceCode(), msg.getProductCode());
        DeviceDownMessage deviceDownMessage = this.buildDownMessage(msg);
        byte[] publishMsg = baseProtocol.encode(topic, msg.getDeviceCode(), deviceDownMessage);
        mqttClient.publish(topic, publishMsg);
        return true;
    }
    /**
     * 任务明细表更新状态
     * */
    private void updateStatus(UpgradeMessageBo msg, Boolean pubStatus){
        if(!pubStatus){
            //这块是分布式的，所以不能这么判断，设备不在线 就不管了，不然会有多条数据
            //taskDetailService.update(msg, UpgradeEnum.STOP, "");
            return;
        }
        taskDetailService.update(msg, UpgradeEnum.SEND, "");
    }

    private DeviceDownMessage buildDownMessage(UpgradeMessageBo msg) {
        DeviceDownMessage deviceDownMessage = new DeviceDownMessage();
        deviceDownMessage.setProductCode(msg.getProductCode());
        deviceDownMessage.setDeviceCode(msg.getDeviceCode());
        if(UpgradePushTypeEnum.URL.getType().equals(msg.getPushType())){
            msg.setFileBase64(null);
        }
        //msg.setFileBase64(null);
        deviceDownMessage.setBody(msg);
        deviceDownMessage.setTimestamp(System.currentTimeMillis());
        String messageId = IdGenerate.genId();
        deviceDownMessage.setMessageId(messageId);
        msg.setMessageId(messageId);
        return deviceDownMessage;
    }

}
